package org.lisen.scdemo.receiver.service;

import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

import java.io.IOException;
import java.net.ConnectException;
import java.net.SocketException;
import java.util.concurrent.TimeUnit;

/**
 * @author Administrator
 * @create 2020-02-2422:30
 */
@Component
@Slf4j
public class ReceiverConfirmDemo {

    @RabbitListener(queues = "direct.queue")
    @RabbitHandler
    public void message(Message message, Channel channel) throws IOException {

        log.info("消息内容: {}",new String(message.getBody()));

        /**
         * 模拟业务处理方法.
         * 对应业务方法中出现的异常需要区分对待，例如以下情况：
         * 1）网络异常等可能恢复的异常，可以设置消息重新返回到队列，以便于重新处理
         * 2）对应业务数据等不可恢复的异常，则可以进行补偿操作，或放入死信队列进行人工干预
         */
        try {
            log.info("正在处理 ....");
            TimeUnit.SECONDS.sleep(5);

            long deliveryTag = message.getMessageProperties().getDeliveryTag();

            //模拟在业务处理是发生了网络异常，如：在连接数据库保存数据时网络发生了抖动
            //此类异常是可以恢复的，需要要消息重新返回队列，以便于下次处理
            if(deliveryTag % 2 == 0) {
                throw new ConnectException("模拟消息消费者发生网络异常");
            }

            //模拟发生不可恢复异常，此种情况消息重新入队没有意义
            if(deliveryTag % 3 == 0) {
                throw new ClassCastException("模拟消息消费者发生不可恢复异常");
            }

        } catch (SocketException se) {

            log.info("SocketException: {}", se.getMessage());

            //拒绝deliveryTag对应的消息，第二个参数是否requeue，true则重新入队列，false则不会重新入队
            //如果配置了死信队列则消息会被投递到死信队列
            channel.basicReject(message.getMessageProperties().getDeliveryTag(), true);

            //不确认deliveryTag对应的消息，第二个参数是否应用于多消息，第三个参数是否requeue，
            //与basic.reject区别就是同时支持多个消息，可以nack该消费者先前接收未ack的所有消息。
            // nack后的消息也会被自己消费到
            //channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);

            //是否恢复消息到队列，参数是是否requeue，true则重新入队列，
            // 并且尽可能的将之前recover的消息投递给其他消费者消费，
            //而不是自己再次消费。false则消息会重新被投递给自己
            //channel.basicRecover(true);
            return;
        } catch (Exception e) {
            //此处处理无法恢复的异常，可记录日志或将消息发送到指定的队列以便于后续的处理
            log.info("Exception: {}", e.getMessage());
            channel.basicReject(message.getMessageProperties().getDeliveryTag(), false);
            return;
        }

        log.info("处理完毕， 发送ack确认 .... ");
        channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
    }

}
